# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
@file gradient.py
Gradient: compute dFi/dXj for a given field, up to all components in all directions.
MinMaxGradientStatistics: compute min(dFi/dXj), max(dFi/dXj) and/or max(|dFi/dXj|)
for a given field, up to all components in all directions.
"""
from hysop import vprint
from hysop.constants import Implementation, DirectionLabels, TranspositionState
from hysop.fields.continuous_field import Field, ScalarField, TensorField
from hysop.tools.htypes import check_instance, first_not_None, to_tuple
from hysop.tools.decorators import debug
from hysop.tools.numpywrappers import npw
from hysop.core.graph.graph import op_apply
from hysop.core.graph.computational_operator import ComputationalGraphOperator
from hysop.core.graph.computational_node import ComputationalGraphNode
from hysop.core.graph.computational_node_frontend import ComputationalGraphNodeFrontend
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.operator.directional.directional import DirectionalOperatorGeneratorI
from hysop.operator.derivative import (
SpaceDerivative,
MultiSpaceDerivatives,
FiniteDifferencesSpaceDerivative,
SpectralSpaceDerivative,
)
from hysop.operator.min_max import (
MinMaxDerivativeStatistics,
MinMaxFiniteDifferencesDerivativeStatistics,
MinMaxSpectralDerivativeStatistics,
)
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.parameters.tensor_parameter import TensorParameter
[docs]
class Gradient(MultiSpaceDerivatives):
"""
Generate multiple SpaceDerivative operators to compute the gradient of a Field.
"""
[docs]
@classmethod
def implementations(cls):
return SpaceDerivative.implementations()
@debug
def __new__(
mcls,
F,
gradF,
directions=None,
implementation=None,
cls=FiniteDifferencesSpaceDerivative,
base_kwds=None,
**kwds,
):
base_kwds = {} if (base_kwds is None) else base_kwds
base_kwds.update(
dict(candidate_input_tensors=None, candidate_output_tensors=None)
)
return super().__new__(
mcls,
Fs=None,
dFs=None,
cls=cls,
candidate_input_tensors=(F,),
candidate_output_tensors=(gradF,),
derivatives=None,
directions=directions,
implementation=implementation,
base_kwds=base_kwds,
**kwds,
)
@debug
def __init__(
self,
F,
gradF,
directions=None,
implementation=None,
cls=FiniteDifferencesSpaceDerivative,
base_kwds=None,
**kwds,
):
"""
Create an operator generator that yields a sequence of operators
that compute the gradient of an input field F.
Given F, a scalar, vector or tensor field of dimension n,
compute the field of dimension n+1 that is the gradient of F:
ScalarField: F -> gradF[j] = dF/dxj
VectorField: F -> gradF[i,j] = dFi/dxj
TensorField: F -> gradF[i0,...,in,j] = dF[i0,...,in]/dxj
Derivatives can be computed with respect to specific directions and not necessarily
in all directions.
To restrict the number of components, take a tensor view on F (and gradF).
Example: if F is a VectorField of m components (F0, ..., Fm) in a domain of dimension n,
this operator will compute gradF[i,j] = dF[i]/dx[j].
( dF0/dx0 ... dF0/dxn )
( . . . )
grad(F) = ( . . . )
( . . . )
( dFm/dx0 ... dFm/dxn )
where F is an input field
gradF is an output field != F
0 <= i < nb_components
0 <= j < nb_directions
nb_components = F.nb_components
nb_directions = min( F.dim, len(directions))
Parameters
----------
F: hysop.field.continuous_field.Field
Continuous field as input (Scalar, Vector or TensorField).
All contained field have to live on the same domain.
gradF: hysop.field.continuous_field.Field
Continuous field to be written, should have
exactly shape F.shape + (ndirections,)
directions: tuple of ints, optional
The directions in which to take the derivatives.
Defaults to range(F.dim).
nb_directions = min(F.dim, len(directions))
implementation: Implementation, optional, defaults to None
target implementation, should be contained in available_implementations().
If None, implementation will be set to default_implementation().
kwds: dict, optional
Extra parameters passed towards base class (MultiSpaceDerivatives).
"""
base_kwds = first_not_None(base_kwds, {})
check_instance(F, Field)
check_instance(gradF, Field)
check_instance(directions, tuple, values=int, allow_none=True)
directions = to_tuple(first_not_None(directions, range(F.dim)))
ndirections = len(directions)
if F.is_tensor:
nfields = F.size
oshape = F.shape + (ndirections,)
else:
nfields = 1
oshape = (ndirections,)
if gradF.is_tensor:
if gradF.shape != oshape:
msg = "Gradient field shape mismatch, expected {} but got {}."
msg = msg.format(oshape, gradF.shape)
raise ValueError(msg)
else:
if oshape != (1,):
msg = "Gradient field shape mismatch, expected {} but got a scalar "
msg += "field (ie. shape={})."
msg = msg.format(oshape, (1,))
raise ValueError(msg)
Fs = tuple(f for f in F.fields for d in directions)
dFs = gradF.fields
directions = tuple(d for _ in range(nfields) for d in directions)
derivatives = (1,) * len(directions)
base_kwds.update(
dict(candidate_input_tensors=(F,), candidate_output_tensors=(gradF,))
)
if not issubclass(cls, (SpaceDerivative, MinMaxDerivativeStatistics)) or (
cls in (SpaceDerivative, MinMaxDerivativeStatistics)
):
msg = "cls should be a subclass of SpaceDerivative or MinMaxSpaceDerivativeStatistics, got {}."
msg += "\ncls MRO is:\n "
msg += "\n ".join(str(t) for t in cls.__mro__)
msg = msg.format(cls)
raise TypeError(msg)
implementation = first_not_None(implementation, cls.default_implementation())
super().__init__(
Fs=Fs,
dFs=dFs,
cls=cls,
candidate_input_tensors=(F,),
candidate_output_tensors=(gradF,),
derivatives=derivatives,
directions=directions,
implementation=implementation,
base_kwds=base_kwds,
**kwds,
)
[docs]
class MinMaxGradientStatistics(Gradient):
"""
Interface for computing some statistics on the gradient of a field (minimum and maximum)
one component at a time to limit memory usage.
This will generate multiple MinMaxDerivativeStatistics operators.
"""
@debug
def __new__(
mcls,
F,
gradF=None,
directions=None,
coeffs=None,
Fmin=None,
Fmax=None,
Finf=None,
all_quiet=True,
print_tensors=True,
name=None,
pretty_name=None,
pbasename=None,
ppbasename=None,
variables=None,
implementation=None,
base_kwds=None,
cls=MinMaxFiniteDifferencesDerivativeStatistics,
**kwds,
):
return super().__new__(
mcls,
F=F,
gradF=gradF,
directions=directions,
extra_params=None,
cls=cls,
variables=variables,
**kwds,
)
@debug
def __init__(
self,
F,
gradF=None,
directions=None,
coeffs=None,
Fmin=None,
Fmax=None,
Finf=None,
all_quiet=True,
print_tensors=True,
name=None,
pretty_name=None,
pbasename=None,
ppbasename=None,
variables=None,
implementation=None,
base_kwds=None,
cls=MinMaxFiniteDifferencesDerivativeStatistics,
**kwds,
):
"""
Create an operator generator that yields a sequence of operators
that compute statistics on the gradient of an input field F.
MinMaxGradientStatistics can compute some commonly used Field statistics:
Fmin: component-wise and direction-wise min values of the gradient of the field.
Fmax: component-wise and direction-wise max values of the gradient of the field.
Finf: component-wise and direction-wise max values of the absolute value of the
gradient of the field (computed using Fmin and Fmax).
Derivatives can be computed with respect to specific directions and not necessarily in
all directions. To restrict the number of components, take a tensor view on F (and gradF).
Let k = idx + (j,)
gradF[k] = dF[idx]/dXd
Fmax[k] = Smin * min( dF[idx]/dXd )
Fmin[k] = Smax * max( dF[idx]/dXd )
Finf[k] = Sinf * max( |Fmin[k]|, |Fmax[k]| )
where F is an input field,
nb_components = F.nb_components = np.prod(F.shape)
nb_directions = min( F.dim, len(directions))
gradF is an optional output tensor field,
idx is contained in numpy.ndindex(*F.shape)
0 <= j < nb_directions
d = directions[j]
Fmin = created or supplied TensorParameter of shape F.shape + (nb_directions,).
Fmax = created or supplied TensorParameter of shape F.shape + (nb_directions,).
Finf = created or supplied TensorParameter of shape F.shape + (nb_directions,).
Smin = coeffs['Fmin'] or coeffs['Fmin'][k]
Smax = coeffs['Fmax'] or coeffs['Fmax'][k]
Sinf = coeffs['Finf'] or coeffs['Fmax'][k]
All statistics are only computed if explicitely required by user,
unless required to compute another required statistic, see Notes.
Parameters
----------
F: Field
The continuous input field on which the gradient
will be taken and statistics will be computed.
gradF: Field, optional
Optional output field for the gradient.
If the gradient is required as an output, one can also use MinMaxStatistics
on a precomputed gradient (using the Gradient operator) instead of
MinMaxGradientStatistics.
directions: array like of ints, optional
The directions in which the statistics are computed,
defaults to all directions (ie. range(F.dim)).
coeffs: dict of scalar or array like of coefficients, optional
Optional scaling of the statistics.
Scaling factor should be a scalar or an array-like of scalars for
each components of gradF. If not given default to 1 for all statistics.
F...: TensorParameter or boolean, optional
At least one statistic should be specified (either by boolean or TensorParameter).
TensorParameters should be of shape F.shape + (nb_directions,), see Notes.
If set to True, the TensorParameter will be generated automatically.
Autogenerated TensorParameters that are not required by the user
but are generated anyway are set to be quiet.
All autogenerated TensorParameters can be retrieved as attributes
of this object.
all_quiet: bool, optional, defaults to True
Set all generated params to be quiet, even the ones that are requested
explicitely.
print_tensors: bool, optional, defaults to True
Should the phony operator print the tensor parameters during apply ?
name: str, optional
Name template for generated operator names.
Defaults to MinMax({}) where {} will be replaced by gradF[k].name.
pretty_name: str, optional
Name template for generaed operatr pretty names.
Defaults to |+/-{}| where {} will be replaced by gradF[k].pretty_name.
pbasename: str, optional
Basename for created tensor parameters.
Defaults to gradF.name.
ppbasename: str, optional
Pretty basename for created tensor parameters.
Defaults to gradF.pretty_name.
variables: dict
Dictionary of fields as keys and topologies as values.
implementation: hysop.constants.Implementation, optional
Specify generated operator underlying backend implementation.
Target implementation, should be contained in
MinMaxDerivativeStatistics.available_implementations().
If None, implementation will be set to
MinMaxDerivativeStatistics.default_implementation().
base_kwds: dict
Base class keyword arguments.
kwds: dict
Additional kwds passed to chosen implementation.
Attributes:
-----------
Fmin, Fmax, Finf: TensorParameter
All generated tensor parameters.
Unused statistics are set to None.
Notes
-----
nb_components = F.nb_components
nb_directions = min(F.dim, len(directions)).
About statistics:
Finf requires to compute Fmin and Fmax.
Finf = Sinf * max( abs(Smin*Fmin), abs(Smax*Fmax))
where Sinf, Smin and Smax are the scaling coefficients defined in coeffs.
"""
check_instance(F, Field)
check_instance(gradF, Field, allow_none=True)
check_instance(
directions,
tuple,
values=int,
allow_none=True,
minval=0,
maxval=F.dim - 1,
minsize=1,
unique=True,
)
check_instance(
coeffs, dict, keys=str, values=(int, float, npw.number), allow_none=True
)
check_instance(
variables,
dict,
keys=Field,
values=CartesianTopologyDescriptors,
allow_none=True,
)
check_instance(name, str, allow_none=True)
check_instance(pbasename, str, allow_none=True)
check_instance(ppbasename, str, allow_none=True)
check_instance(implementation, Implementation, allow_none=True)
check_instance(base_kwds, dict, allow_none=True)
check_instance(all_quiet, bool, allow_none=True)
if (
((Fmin is None) or (Fmin is False))
and ((Fmax is None) or (Fmax is False))
and ((Finf is None) or (Finf is False))
):
msg = "No statistics were requested."
msg += "\nPlease specify Fmin, Fmax and/or Finf by either setting "
msg += " their value to True, or by by passing an already existing "
msg += " tensor parameter."
raise ValueError(msg)
coeffs = first_not_None(coeffs, {})
variables = first_not_None(variables, {F: None})
all_quiet = first_not_None(all_quiet, False)
directions = to_tuple(first_not_None(directions, range(F.dim)))
nb_directions = len(directions)
if F.is_tensor:
oshape = F.shape + (nb_directions,)
else:
oshape = (nb_directions,)
if gradF is None:
gradF = F.gradient(directions=directions, is_tmp=True)
assert gradF.shape == oshape, gradF.shape
variables.setdefault(gradF, variables[F])
_names = {"Fmin": "{}_min", "Fmax": "{}_max", "Finf": "{}_inf"}
_pretty_names = {"Fmin": "{}₋", "Fmax": "{}₊", "Finf": "|{}|₊"}
pbasename = first_not_None(pbasename, gradF.name)
ppbasename = first_not_None(ppbasename, gradF.pretty_name)
names = {k: v.format(pbasename) for (k, v) in _names.items()}
pretty_names = {k: v.format(ppbasename) for (k, v) in _pretty_names.items()}
def make_param(k, quiet):
return TensorParameter(
name=names[k],
pretty_name=pretty_names[k],
dtype=F.dtype,
shape=oshape,
quiet=quiet,
)
parameters = {}
_parameters = dict(Fmin=Fmin, Fmax=Fmax, Finf=Finf)
for k, v in _parameters.items():
param = _parameters[k]
if isinstance(param, TensorParameter):
pass
elif param is True:
param = make_param(k, quiet=all_quiet)
elif (_parameters["Finf"] is not None) and ((k == "Fmin") or (k == "Fmax")):
param = make_param(k, quiet=True)
else:
param = None
setattr(self, k, param)
continue
assert param.shape == oshape
coeffs.setdefault(k, 1)
parameters[k] = param
setattr(self, k, param)
unused_coeffs = set(coeffs.keys()) - set(parameters.keys())
if unused_coeffs:
msg = "The following coefficients are not needed: {}"
msg = msg.format(unused_coeffs)
raise ValueError(unused_coeffs)
name = first_not_None(name, "MinMax({})")
pretty_name = first_not_None(pretty_name, "|±{}|")
extra_params = {
"name": gradF.new_empty_array(),
"pretty_name": gradF.new_empty_array(),
"coeffs": coeffs,
"implementation": implementation,
}
for idx, Fi in gradF.nd_iter():
for statname, stat in parameters.items():
if stat is None:
continue
pname = _names[statname].format(Fi.name)
ppname = _pretty_names[statname].format(Fi.pretty_name)
S = stat.view(idx=idx, name=pname, pretty_name=ppname)
stats = extra_params.setdefault(statname, gradF.new_empty_array())
stats[idx] = S
extra_params["name"][idx] = name.format(Fi.name)
extra_params["pretty_name"][idx] = pretty_name.format(Fi.pretty_name)
super().__init__(
F=F,
gradF=gradF,
directions=directions,
extra_params=extra_params,
cls=cls,
variables=variables,
**kwds,
)
# add a phony operator to gather parameter views
class MergeTensorViewsOperator(ComputationalGraphOperator):
@classmethod
def supports_mpi(cls):
return True
@op_apply
def apply(self, **kwds):
super().apply(**kwds)
if not print_tensors:
return
for k, v in _parameters.items():
if (v is not None) and (v is not False):
param = parameters[k]
msg = f">Parameter {param.pretty_name} set to:\n{param.value}"
vprint(msg)
_phony_input_params = set()
_phony_output_params = set()
for pname in _names.keys():
if pname in extra_params:
param = parameters[pname]
_phony_input_params.update({p for p in extra_params[pname].ravel()})
_phony_output_params.update({param})
op = MergeTensorViewsOperator(
name=name.format(gradF.name),
pretty_name=pretty_name.format(gradF.pretty_name),
input_params=_phony_input_params,
output_params=_phony_output_params,
)
self._phony_op = op
@debug
def _generate(self):
"""Generate all operators."""
operators = super()._generate()
operators += (self._phony_op,)
return operators
[docs]
@debug
def generate_direction(self, i, dt_coeff):
# See MultiSpaceDerivatives for the directional interface
ops = super().generate_direction(i=i, dt_coeff=dt_coeff)
if ops and (i == max(self.directions)):
ops += (self._phony_op,)
return ops